home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- import socket
- from Cipher import Cipher, Cipher_Stack
- from Session import Session
- from M2Crypto import BIO, X509, m2
- import timeout
- import Checker
- from M2Crypto.SSL import SSLError
-
- def _serverPostConnectionCheck(*args, **kw):
- return 1
-
-
- class Connection:
- clientPostConnectionCheck = Checker.Checker()
- serverPostConnectionCheck = _serverPostConnectionCheck
- m2_bio_free = m2.bio_free
- m2_ssl_free = m2.ssl_free
-
- def __init__(self, ctx, sock = None):
- self.ctx = ctx
- self.ssl = m2.ssl_new(self.ctx.ctx)
- if sock is not None:
- self.socket = sock
- else:
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self._fileno = self.socket.fileno()
- self.blocking = self.socket.gettimeout()
- self.ssl_close_flag = m2.bio_noclose
-
-
- def __del__(self):
- if getattr(self, 'sslbio', None):
- self.m2_bio_free(self.sslbio)
-
- if getattr(self, 'sockbio', None):
- self.m2_bio_free(self.sockbio)
-
- if self.ssl_close_flag == m2.bio_noclose and getattr(self, 'ssl', None):
- self.m2_ssl_free(self.ssl)
-
- self.socket.close()
-
-
- def close(self):
- m2.ssl_shutdown(self.ssl)
-
-
- def clear(self):
- return m2.ssl_clear(self.ssl)
-
-
- def set_shutdown(self, mode):
- m2.ssl_set_shutdown1(self.ssl, mode)
-
-
- def get_shutdown(self):
- return m2.ssl_get_shutdown(self.ssl)
-
-
- def bind(self, addr):
- self.socket.bind(addr)
-
-
- def listen(self, qlen = 5):
- self.socket.listen(qlen)
-
-
- def ssl_get_error(self, ret):
- return m2.ssl_get_error(self.ssl, ret)
-
-
- def set_bio(self, readbio, writebio):
- m2.ssl_set_bio(self.ssl, readbio._ptr(), writebio._ptr())
-
-
- def set_client_CA_list_from_file(self, cafile):
- m2.ssl_set_client_CA_list_from_file(self.ssl, cafile)
-
-
- def set_client_CA_list_from_context(self):
- m2.ssl_set_client_CA_list_from_context(self.ssl, self.ctx.ctx)
-
-
- def setup_addr(self, addr):
- self.addr = addr
-
-
- def set_ssl_close_flag(self, flag):
- if flag not in (m2.bio_close, m2.bio_noclose):
- raise ValueError('flag must be m2.bio_close or m2.bio_noclose')
-
- self.ssl_close_flag = flag
-
-
- def setup_ssl(self):
- self.sockbio = m2.bio_new_socket(self.socket.fileno(), 0)
- m2.ssl_set_bio(self.ssl, self.sockbio, self.sockbio)
- self.sslbio = m2.bio_new(m2.bio_f_ssl())
- m2.bio_set_ssl(self.sslbio, self.ssl, m2.bio_noclose)
-
-
- def _setup_ssl(self, addr):
- self.setup_addr(addr)
- self.setup_ssl()
-
-
- def set_accept_state(self):
- m2.ssl_set_accept_state(self.ssl)
-
-
- def accept_ssl(self):
- return m2.ssl_accept(self.ssl)
-
-
- def accept(self):
- (sock, addr) = self.socket.accept()
- ssl = Connection(self.ctx, sock)
- ssl.addr = addr
- ssl.setup_ssl()
- ssl.set_accept_state()
- ssl.accept_ssl()
- check = getattr(self, 'postConnectionCheck', self.serverPostConnectionCheck)
- if check is not None:
- if not check(self.get_peer_cert(), ssl.addr[0]):
- raise Checker.SSLVerificationError, 'post connection check failed'
-
-
- return (ssl, addr)
-
-
- def set_connect_state(self):
- m2.ssl_set_connect_state(self.ssl)
-
-
- def connect_ssl(self):
- return m2.ssl_connect(self.ssl)
-
-
- def connect(self, addr):
- self.socket.connect(addr)
- self.addr = addr
- self.setup_ssl()
- self.set_connect_state()
- ret = self.connect_ssl()
- check = getattr(self, 'postConnectionCheck', self.clientPostConnectionCheck)
- if check is not None:
- if not check(self.get_peer_cert(), self.addr[0]):
- raise Checker.SSLVerificationError, 'post connection check failed'
-
-
- return ret
-
-
- def shutdown(self, how):
- m2.ssl_set_shutdown(self.ssl, how)
-
-
- def renegotiate(self):
- return m2.ssl_renegotiate(self.ssl)
-
-
- def pending(self):
- return m2.ssl_pending(self.ssl)
-
-
- def _write_bio(self, data):
- return m2.ssl_write(self.ssl, data)
-
-
- def _write_nbio(self, data):
- return m2.ssl_write_nbio(self.ssl, data)
-
-
- def _read_bio(self, size = 1024):
- if size <= 0:
- raise ValueError, 'size <= 0'
-
- return m2.ssl_read(self.ssl, size)
-
-
- def _read_nbio(self, size = 1024):
- if size <= 0:
- raise ValueError, 'size <= 0'
-
- return m2.ssl_read_nbio(self.ssl, size)
-
-
- def write(self, data):
- if self.blocking:
- return self._write_bio(data)
-
- return self._write_nbio(data)
-
- sendall = send = write
-
- def read(self, size = 1024):
- if self.blocking:
- return self._read_bio(size)
-
- return self._read_nbio(size)
-
- recv = read
-
- def setblocking(self, mode):
- self.socket.setblocking(mode)
- self.blocking = mode
-
-
- def fileno(self):
- return self.socket.fileno()
-
-
- def getsockopt(self, *args):
- return apply(self.socket.getsockopt, args)
-
-
- def setsockopt(self, *args):
- return apply(self.socket.setsockopt, args)
-
-
- def get_context(self):
- return m2.ssl_get_ssl_ctx(self.ssl)
-
-
- def get_state(self):
- return m2.ssl_get_state(self.ssl)
-
-
- def verify_ok(self):
- return m2.ssl_get_verify_result(self.ssl) == m2.X509_V_OK
-
-
- def get_verify_mode(self):
- return m2.ssl_get_verify_mode(self.ssl)
-
-
- def get_verify_depth(self):
- return m2.ssl_get_verify_depth(self.ssl)
-
-
- def get_verify_result(self):
- return m2.ssl_get_verify_result(self.ssl)
-
-
- def get_peer_cert(self):
- c = m2.ssl_get_peer_cert(self.ssl)
- if c is None:
- return None
-
- return X509.X509(c, 1)
-
-
- def get_peer_cert_chain(self):
- c = m2.ssl_get_peer_cert_chain(self.ssl)
- if c is None:
- return None
-
- return X509.X509_Stack(c)
-
-
- def get_cipher(self):
- c = m2.ssl_get_current_cipher(self.ssl)
- if c is None:
- return None
-
- return Cipher(c)
-
-
- def get_ciphers(self):
- c = m2.ssl_get_ciphers(self.ssl)
- if c is None:
- return None
-
- return Cipher_Stack(c)
-
-
- def get_cipher_list(self, idx = 0):
- return m2.ssl_get_cipher_list(self.ssl, idx)
-
-
- def set_cipher_list(self, cipher_list):
- return m2.ssl_set_cipher_list(self.ssl, cipher_list)
-
-
- def makefile(self, mode = 'rb', bufsize = 'ignored'):
- if not 'r' in mode:
- pass
- r = '+' in mode
- if not 'w' in mode and 'a' in mode:
- pass
- w = '+' in mode
- b = 'b' in mode
- m2mode = [
- '',
- 'r'][r] + [
- '',
- 'w'][w] + [
- '',
- 'b'][b]
- bio = BIO.BIO(self.sslbio, _close_cb = self.close)
- m2.bio_do_handshake(bio._ptr())
- return BIO.IOBuffer(bio, m2mode, _pyfree = 0)
-
-
- def getsockname(self):
- return self.socket.getsockname()
-
-
- def getpeername(self):
- return self.socket.getpeername()
-
-
- def set_session_id_ctx(self, id):
- ret = m2.ssl_set_session_id_context(self.ssl, id)
- if not ret:
- raise SSLError(m2.err_reason_error_string(m2.err_get_error()))
-
-
-
- def get_session(self):
- sess = m2.ssl_get_session(self.ssl)
- return Session(sess)
-
-
- def set_session(self, session):
- m2.ssl_set_session(self.ssl, session._ptr())
-
-
- def get_default_session_timeout(self):
- return m2.ssl_get_default_session_timeout(self.ssl)
-
-
- def get_socket_read_timeout(self):
- return timeout.struct_to_timeout(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, 8))
-
-
- def get_socket_write_timeout(self):
- return timeout.struct_to_timeout(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, 8))
-
-
- def set_socket_read_timeout(self, timeo):
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeo.pack())
-
-
- def set_socket_write_timeout(self, timeo):
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, timeo.pack())
-
-
- def get_version(self):
- return m2.ssl_get_version(self.ssl)
-
-
- def set_post_connection_check_callback(self, postConnectionCheck):
- self.postConnectionCheck = postConnectionCheck
-
-
-